package com.shujia.flink.kafka

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

object Demo1KafkaSource {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    val properties = new Properties()
    //broler地址列表
    properties.setProperty("bootstrap.servers", "master:9092,node1:9092,node2:9092")
    //消费者组，同一条数据在一个组内只处理一次
    properties.setProperty("group.id", "test")


    //创建消费者
    val flinkKakfaConsumer = new FlinkKafkaConsumer[String](
      "words", //指定topic
      new SimpleStringSchema(), //指定数据格式
      properties //指定配置文件对象
    )


    flinkKakfaConsumer.setStartFromEarliest() // 尽可能从最早的记录开始
    //flinkKakfaConsumer.setStartFromLatest()        // 从最新的记录开始
    //flinkKakfaConsumer.setStartFromTimestamp()  // 从指定的时间开始（毫秒）
    //flinkKakfaConsumer.setStartFromGroupOffsets()  // 默认的方法, 按照消费者组读取数据，如果消费者组第一次使用，默认读取最新的数据


    //使用kafka source   -- 无界流
    val kafkaDS: DataStream[String] = env.addSource(flinkKakfaConsumer)


    kafkaDS
      .flatMap(_.split(","))
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)
      .print()


    env.execute()

  }

}
